1
//--------------------------------------------------------------------------
3 // Copyright (c) Microsoft Corporation. All rights reserved.
7 //--------------------------------------------------------------------------
9 using System
.Collections
.Concurrent
;
10 using System
.Collections
.Generic
;
11 using System
.Diagnostics
;
13 namespace System
.Threading
.Tasks
15 /// <summary>Asynchronously invokes a handler for every posted item.</summary>
16 /// <typeparam name="T">Specifies the type of data processed by the instance.</typeparam>
17 public sealed class AsyncCall
<T
> : MarshalByRefObject
20 /// A queue that stores the posted data. Also serves as the syncObj for protected instance state.
21 /// A ConcurrentQueue is used to enable lock-free dequeues while running with a single consumer task.
23 private readonly ConcurrentQueue
<T
> _queue
;
24 /// <summary>The delegate to invoke for every element.</summary>
25 private readonly Delegate _handler
;
26 /// <summary>The maximum number of items that should be processed by an individual task.</summary>
27 private readonly int _maxItemsPerTask
;
28 /// <summary>The TaskFactory to use to launch new tasks.</summary>
29 private readonly TaskFactory _tf
;
30 /// <summary>The options to use for parallel processing of data.</summary>
31 private readonly ParallelOptions _parallelOptions
;
32 /// <summary>Whether a processing task has been scheduled.</summary>
33 private int _processingCount
;
35 /// <summary>Initializes the AsyncCall with an action to execute for each element.</summary>
36 /// <param name="actionHandler">The action to run for every posted item.</param>
37 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
38 /// <param name="scheduler">The scheduler to use. If null, the default scheduler is used.</param>
39 /// <param name="maxItemsPerTask">The maximum number of items to be processed per task. If not specified, Int32.MaxValue is used.</param>
40 public AsyncCall(Action
<T
> actionHandler
, int maxDegreeOfParallelism
= 1, int maxItemsPerTask
= Int32
.MaxValue
, TaskScheduler scheduler
= null) :
41 this(maxDegreeOfParallelism
, maxItemsPerTask
, scheduler
)
43 if (actionHandler
== null) throw new ArgumentNullException("handler");
44 _handler
= actionHandler
;
48 /// Initializes the AsyncCall with a function to execute for each element. The function returns an Task
49 /// that represents the asynchronous completion of that element's processing.
51 /// <param name="functionHandler">The function to run for every posted item.</param>
52 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
53 /// <param name="scheduler">The scheduler to use. If null, the default scheduler is used.</param>
54 public AsyncCall(Func
<T
,Task
> functionHandler
, int maxDegreeOfParallelism
= 1, TaskScheduler scheduler
= null) :
55 this(maxDegreeOfParallelism
, 1, scheduler
)
57 if (functionHandler
== null) throw new ArgumentNullException("handler");
58 _handler
= functionHandler
;
61 /// <summary>General initialization of the AsyncCall. Another constructor must initialize the delegate.</summary>
62 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
63 /// <param name="maxItemsPerTask">The maximum number of items to be processed per task. If not specified, Int32.MaxValue is used.</param>
64 /// <param name="scheduler">The scheduler to use. If null, the default scheduler is used.</param>
65 private AsyncCall(int maxDegreeOfParallelism
= 1, int maxItemsPerTask
= Int32
.MaxValue
, TaskScheduler scheduler
= null)
68 if (maxDegreeOfParallelism
< 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
69 if (maxItemsPerTask
< 1) throw new ArgumentOutOfRangeException("maxItemsPerTask");
70 if (scheduler
== null) scheduler
= TaskScheduler
.Default
;
72 // Configure the instance
73 _queue
= new ConcurrentQueue
<T
>();
74 _maxItemsPerTask
= maxItemsPerTask
;
75 _tf
= new TaskFactory(scheduler
);
76 if (maxDegreeOfParallelism
!= 1)
78 _parallelOptions
= new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism, TaskScheduler = scheduler }
;
82 /// <summary>Post an item for processing.</summary>
83 /// <param name="item">The item to be processed.</param>
84 public void Post(T item
)
88 // Add the item to the internal queue
91 // Check to see whether the right number of tasks have been scheduled.
92 // If they haven't, schedule one for this new piece of data.
93 if (_handler
is Action
<T
>)
95 if (_processingCount
== 0)
98 _tf
.StartNew(ProcessItemsActionTaskBody
);
101 else if (_handler
is Func
<T
, Task
>)
103 if (_processingCount
== 0 || // is anyone at all currently processing?
104 (_parallelOptions
!= null && _processingCount
< _parallelOptions
.MaxDegreeOfParallelism
&& // are enough workers currently processing?
105 !_queue
.IsEmpty
)) // and, as an optimization, double check to make sure the item hasn't already been picked up by another worker
108 _tf
.StartNew(ProcessItemFunctionTaskBody
, null);
111 else Debug
.Fail("_handler is an invalid delegate type");
115 /// <summary>Gets an enumerable that yields the items to be processed at this time.</summary>
116 /// <returns>An enumerable of items.</returns>
117 private IEnumerable
<T
> GetItemsToProcess()
119 // Yield the next elements to be processed until either there are no more elements
120 // or we've reached the maximum number of elements that an individual task should process.
121 int processedCount
= 0;
123 while (processedCount
< _maxItemsPerTask
&& _queue
.TryDequeue(out nextItem
))
125 yield return nextItem
;
130 /// <summary>Used as the body of an action task to process items in the queue.</summary>
131 private void ProcessItemsActionTaskBody()
136 Action
<T
> handler
= (Action
<T
>)_handler
;
138 // Process up to _maxItemsPerTask items, either serially or in parallel
139 // based on the provided maxDegreeOfParallelism (which determines
140 // whether a ParallelOptions is instantiated).
141 if (_parallelOptions
== null)
142 foreach (var item
in GetItemsToProcess()) handler(item
);
144 Parallel
.ForEach(GetItemsToProcess(), _parallelOptions
, handler
);
150 // If there are still items in the queue, schedule another task to continue processing.
151 // Otherwise, note that we're no longer processing.
152 if (!_queue
.IsEmpty
) _tf
.StartNew(ProcessItemsActionTaskBody
, TaskCreationOptions
.PreferFairness
);
153 else _processingCount
= 0;
158 /// <summary>Used as the body of a function task to process items in the queue.</summary>
159 private void ProcessItemFunctionTaskBody(object ignored
)
161 bool anotherTaskQueued
= false;
165 Func
<T
, Task
> handler
= (Func
<T
, Task
>)_handler
;
167 // Get the next item from the queue to process
169 if (_queue
.TryDequeue(out nextItem
))
171 // Run the handler and get the follow-on task.
172 // If we got a follow-on task, run this process again when the task completes.
173 // If we didn't, just start another task to keep going now.
174 var task
= handler(nextItem
);
175 if (task
!= null) task
.ContinueWith(ProcessItemFunctionTaskBody
, _tf
.Scheduler
);
176 else _tf
.StartNew(ProcessItemFunctionTaskBody
, null);
178 // We've queued a task to continue processing, which means that logically
179 // we're still maintaining the same level of parallelism.
180 anotherTaskQueued
= true;
185 // If we didn't queue up another task to continue processing (either
186 // because an exception occurred, or we failed to grab an item from the queue)
187 if (!anotherTaskQueued
)
191 // Verify that there's still nothing in the queue, now under the same
192 // lock that the queuer needs to take in order to increment the processing count
193 // and launch a new processor.
194 if (!_queue
.IsEmpty
) _tf
.StartNew(ProcessItemFunctionTaskBody
, null);
195 else _processingCount
--;
202 /// <summary>Provides static factory methods for creating AsyncCall(Of T) instances.</summary>
203 public static class AsyncCall
205 /// <summary>Initializes the AsyncCall with an action to execute for each element.</summary>
206 /// <param name="actionHandler">The action to run for every posted item.</param>
207 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
208 /// <param name="scheduler">The scheduler to use. If null, the default scheduler is used.</param>
209 /// <param name="maxItemsPerTask">The maximum number of items to be processed per task. If not specified, Int32.MaxValue is used.</param>
210 public static AsyncCall
<T
> Create
<T
>(Action
<T
> actionHandler
, int maxDegreeOfParallelism
= 1, int maxItemsPerTask
= Int32
.MaxValue
, TaskScheduler scheduler
= null)
212 return new AsyncCall
<T
>(actionHandler
, maxDegreeOfParallelism
, maxItemsPerTask
, scheduler
);
216 /// Initializes the AsyncCall with a function to execute for each element. The function returns an Task
217 /// that represents the asynchronous completion of that element's processing.
219 /// <param name="functionHandler">The function to run for every posted item.</param>
220 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
221 /// <param name="maxItemsPerTask">The maximum number of items to be processed per task. If not specified, Int32.MaxValue is used.</param>
222 /// <param name="scheduler">The scheduler to use. If null, the default scheduler is used.</param>
223 public static AsyncCall
<T
> Create
<T
>(Func
<T
, Task
> functionHandler
, int maxDegreeOfParallelism
= 1, TaskScheduler scheduler
= null)
225 return new AsyncCall
<T
>(functionHandler
, maxDegreeOfParallelism
, scheduler
);
228 /// <summary>Initializes the AsyncCall in the specified AppDomain with an action to execute for each element.</summary>
229 /// <param name="actionHandler">The action to run for every posted item.</param>
230 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
231 /// <param name="maxItemsPerTask">The maximum number of items to be processed per task. If not specified, Int32.MaxValue is used.</param>
232 public static AsyncCall
<T
> CreateInTargetAppDomain
<T
>(AppDomain targetDomain
, Action
<T
> actionHandler
, int maxDegreeOfParallelism
= 1, int maxItemsPerTask
= Int32
.MaxValue
)
234 return (AsyncCall
<T
>)targetDomain
.CreateInstanceAndUnwrap(
235 typeof(AsyncCall
<T
>).Assembly
.FullName
, typeof(AsyncCall
<T
>).FullName
,
236 false, Reflection
.BindingFlags
.CreateInstance
, null,
237 new object[] { actionHandler, maxDegreeOfParallelism, maxItemsPerTask, null }
,
242 /// Initializes the AsyncCall in the specified AppDomain with a function to execute for each element.
243 /// The function returns an Task that represents the asynchronous completion of that element's processing.
245 /// <param name="functionHandler">The action to run for every posted item.</param>
246 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism to use. If not specified, 1 is used for serial execution.</param>
247 public static AsyncCall
<T
> CreateInTargetAppDomain
<T
>(AppDomain targetDomain
, Func
<T
, Task
> functionHandler
, int maxDegreeOfParallelism
= 1)
249 return (AsyncCall
<T
>)targetDomain
.CreateInstanceAndUnwrap(
250 typeof(AsyncCall
<T
>).Assembly
.FullName
, typeof(AsyncCall
<T
>).FullName
,
251 false, Reflection
.BindingFlags
.CreateInstance
, null,
252 new object[] { functionHandler, maxDegreeOfParallelism, null }
,